跳到主要内容

Go 语言并发学习-并发模式 Runner、Pool、Work

Go 语言实现的三种常用并发模式;这些模式可以在实际生产应用中合理使用,免去了我们造轮子的过程。

  • Runner
  • Pool
  • Work

Runner 定时任务

在 Golang 中,Runner 是一种常见的并发模式,用于管理和控制多个并发任务的执行。Runner 模式通过协调和监控多个任务的执行,提供了一种简单可靠的方式来处理并发任务。

Runner 模式通常由一个 Runner 结构体表示,其中包含了任务的配置和状态信息。Runner 结构体通常具有以下属性:

  • tasks:一个包含多个任务的切片。
  • complete:一个通道,用于接收完成任务的信号。
  • timeout:任务的超时时间,控制任务的最长执行时间。
  • interrupt:一个通道,用于接收中断信号,用于取消执行任务。

Runner 模式的核心思想是使用一个无限循环来不断地执行任务,并通过信号来控制任务的执行和结束。

下面是一个简单的示例代码,演示了如何实现一个 Runner:

package main

import (
"fmt"
"time"
)

type Runner struct {
tasks []func(int)
complete chan int
timeout <-chan time.Time
interrupt chan struct{}
}

func NewRunner(duration time.Duration) *Runner {
return &Runner{
complete: make(chan int),
timeout: time.After(duration),
interrupt: make(chan struct{}),
}
}

func (r *Runner) AddTask(task func(int)) {
r.tasks = append(r.tasks, task)
}

func (r *Runner) run() {
for id, task := range r.tasks {
select {
case <-r.interrupt:
return
default:
task(id)
r.complete <- id
}
}
}

func (r *Runner) Start() int {
go r.run()

select {
case <-r.timeout:
close(r.interrupt)
fmt.Println("Timeout! Aborting...")
return -1
case id := <-r.complete:
return id
}
}

func main() {
runner := NewRunner(3 * time.Second)

runner.AddTask(func(id int) {
fmt.Printf("Executing task %d\n", id)
time.Sleep(time.Duration(id) * time.Second)
})

runner.AddTask(func(id int) {
fmt.Printf("Executing task %d\n", id)
time.Sleep(time.Duration(id) * time.Second)
})

if id := runner.Start(); id != -1 {
fmt.Printf("Task %d completed\n", id)
}
}

在上述示例中,我们创建了一个 Runner,并添加了两个任务。每个任务只是简单地打印任务的编号,并根据编号进行相应的睡眠时间。

Runner 的 Start 方法用于启动任务执行。它会在超时时间内等待任务的完成,并返回完成的任务编号。如果超时发生,则会中断正在执行的任务,并返回 -1。

通过使用 Runner 模式,我们可以在控制并发任务的执行、设置超时时间、中断任务等方面提供一定程度的灵活性和可控性。这种模式在需要管理多个并发任务的场景中非常有用,如并发爬虫、并发数据处理等。

优化为并发模式

在上述示例中,Runner 模式是按顺序一个个执行任务的。这是因为在 run 方法中,每个任务都会在前一个任务完成后才会执行。

如果希望并发执行多个任务,可以对 run 方法进行修改,使用 goroutine 来并发执行任务。这样可以让多个任务同时运行,而不是一个个按顺序执行。

下面是修改后的示例代码,实现并发执行任务的 Runner 模式:

func (r *Runner) run() {
var wg sync.WaitGroup
for id, task := range r.tasks {
wg.Add(1)
go func(id int, task func(int)) {
defer wg.Done()
select {
case <-r.interrupt:
return
default:
task(id)
r.complete <- id
}
}(id, task)
}
wg.Wait()
}

在修改后的代码中,我们使用 sync.WaitGroup 来等待所有任务的完成。每个任务都在一个单独的 goroutine 中执行,并通过 wg.Add(1) 增加计数器。在每个任务完成时,我们调用 wg.Done() 来减少计数器。

通过这种方式,我们可以实现并发执行多个任务,而不是一个个按顺序执行。当所有任务都完成后,wg.Wait() 会阻塞,直到所有任务完成。

这种并发执行任务的 Runner 模式适用于需要同时处理多个任务,并希望充分利用并发性能的场景。请根据实际需求选择适合的执行方式。

Pool 缓存池

缓存池的概念随处可见。

在 Go 1.6 及之后的版本中,标准库里自带了资源池的实现

sync.Pool 的使用方式非常简单:

只需要实现 New 函数即可。对象池中没有对象时,将会调用 New 函数创建。

var studentPool = sync.Pool{
New: func() interface{} {
return new(Student) // 例如创建 Student 对象
},
}

取得对象和归还对象

stu := studentPool.Get().(*Student)
json.Unmarshal(buf, stu) // 使用线程池的对象
studentPool.Put(stu)

Get() 用于从对象池中获取对象,因为返回值是 interface{},因此需要类型转换。 Put() 则是在对象使用完毕后,返回对象池。

Worker 并发模式

在 Golang 中,Worker 模式是一种常见的并发模式,用于管理和执行一组工作任务。Worker 模式适用于需要并发处理一组独立任务的情况,其中每个任务可以独立运行而无需与其他任务进行通信。

提示

Worker 模式在 Golang 中通常用于解决以下常见的并发问题:

  1. 并发任务处理:当需要并发地处理一组独立任务时,可以使用 Worker 模式。每个任务可以在独立的工作者中执行,从而实现任务的并发处理。

  2. 并行计算:当有大量相似或独立的计算任务时,可以使用 Worker 模式将计算任务分发给多个工作者并行执行。这样可以提高计算性能和并行度。

  3. 资源池管理:如果需要对资源进行管理,例如数据库连接、网络连接或文件句柄,可以使用 Worker 模式来管理资源池。每个工作者可以从资源池中获取资源并执行相应的任务,完成后将资源返回给资源池。

  4. 批量任务处理:当需要处理大量的批量任务时,可以使用 Worker 模式来并发地处理任务。通过将任务分发给多个工作者,并行地处理任务,可以提高处理效率和吞吐量。

  5. 并发请求处理:在网络编程中,当需要处理大量并发请求时,可以使用 Worker 模式来处理请求。每个工作者可以独立处理一个请求,从而实现请求的并发处理。

Worker 模式提供了一种简单而有效的方式来处理并发任务,并充分利用多核和并发性能。它能够提高系统的响应能力、并行度和吞吐量,同时保持代码的简洁性和可读性。因此,在许多并发场景中,使用 Worker 模式是一种常见的选择。

Worker 模式通常由以下组件组成:

  1. 任务队列(Task Queue):用于存储待执行的任务。可以使用通道(Channel)或其他数据结构(如队列或切片)来实现。

  2. 工作者池(Worker Pool):一组工作者(Worker)的集合,每个工作者可以并发地从任务队列中取出任务并执行。

  3. 结果通道(Result Channel):用于接收任务的执行结果。每个工作者在执行完任务后,将结果发送到结果通道中。

下面是一个简单的示例代码,演示了如何实现一个 Worker 模式:

package main

import (
"fmt"
"sync"
)

type Worker struct {
ID int
TaskQueue chan Task
Results chan Result
Quit chan struct{}
}

type Task struct {
ID int
Data interface{}
}

type Result struct {
TaskID int
Data interface{}
}

func NewWorker(id int, taskQueue chan Task, results chan Result) *Worker {
return &Worker{
ID: id,
TaskQueue: taskQueue,
Results: results,
Quit: make(chan struct{}),
}
}

func (w *Worker) Start(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case task := <-w.TaskQueue:
result := w.processTask(task)
w.Results <- result
case <-w.Quit:
return
}
}
}()
}

func (w *Worker) processTask(task Task) Result {
// 执行任务的逻辑
result := Result{
TaskID: task.ID,
Data: fmt.Sprintf("Task %d processed by Worker %d", task.ID, w.ID),
}
return result
}

func main() {
taskQueue := make(chan Task)
results := make(chan Result)
numWorkers := 3
var wg sync.WaitGroup

// 创建工作者池
for i := 0; i < numWorkers; i++ {
worker := NewWorker(i, taskQueue, results)
worker.Start(&wg)
}

// 添加任务到任务队列
for i := 0; i < 10; i++ {
task := Task{
ID: i,
Data: fmt.Sprintf("Task %d", i),
}
taskQueue <- task
}

// 关闭任务队列,等待所有任务完成
close(taskQueue)
wg.Wait()

// 处理任务结果
for i := 0; i < 10; i++ {
result := <-results
fmt.Println(result.Data)
}
close(results)
}

在上述示例中,我们创建了一个工作者池,其中包含三个工作者(Worker)。每个工作者都从任务队列(Task Queue)中取出任务,并通过执行 processTask 函数来处理任务。处理完成后,工作者将结果发送到结果通道(Result Channel)中。

主函数负责创建任务并添加到任务队列中,然后关闭任务队列并等待所有任务完成。最后,从结果通道中接收并处理任务的执行结果。

Worker 模式在需要并发执行一组独立任务的场景中非常有用。通过使用工作者池和任务队列,可以方便地管理和控制并发任务的执行,并且能够有效地利用系统资源。